package cn._51doit.flink.day05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

/**
 * 所有的window，按照是否key后再划分窗口，分为KeyedWindow和NonKeyedWindow
 *
 * 如果先keyBy再划分窗口，就是KeyedWindow，底层调用的是window方法
 *
 * 如果是KeyedWindow，window和window operator对应的Task并行度永远为可以是1到多个
 *
 * keyBy后划分的countWindow，是多并行的，当一个组中的数据条数达到指定的数量，这个组对应的数据单独触发
 *
 */
public class CountWindowDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //spark,4
        //hive,5
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //对数据进行映射
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String[] fields = line.split(",");
                String word = fields[0];
                int count = Integer.parseInt(fields[1]);
                return Tuple2.of(word, count);
            }
        });

        //先keyBy，再划分窗口
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);
        WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> windowedStream = keyedStream.countWindow(5);
        //划分窗口后，还要调用window operator
        SingleOutputStreamOperator<Tuple2<String, Integer>> res = windowedStream.sum(1);

        res.print();

        env.execute();


    }
}
